#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <time.h>
#include <pthread.h>
#include <semaphore.h>
#include "../src/bmtp2.h"

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
sem_t sem;

int n = 1,c = 1,t = 1,p = 1039, q = 0;
char *host = "127.0.0.1";

unsigned long long int pub_start, pub_end;

void on_open( BMTP *bmtp ) {
    pthread_mutex_lock(&mutex);
    static int i = 0;
    i++;
    pthread_mutex_unlock(&mutex);

    if( i == c ) {
        printf("Done.\n");

	    sem_post(&sem);
    }
}

void on_close( BMTP *bmtp ) {
    printf("close\n");
}

void on_error( BMTP *bmtp, int err_no ) {
    if( err_no != 2 ) {
        printf("error: %d\n", err_no);
    }
}

void on_pub( BMTP *bmtp, BMTP_MSG *msg) {
    printf("%.*s\n", (int)msg->data_len, msg->data);
}

void on_ack( BMTP *bmtp, BMTP_PACKAGE *msg, int status ) {
    //printf("%lld:%.*s\n", msg_id, len, data);

    pthread_mutex_lock(&mutex);
    static int i = 0;
    i++;
    pthread_mutex_unlock(&mutex);

    //printf("%d\n", i);
    if( i % 100000 == 0 ) {
        printf(".");
        fflush( stdout );
    }

    //printf("%d\n",i);
    if( i == n ) {
        struct timeval tv;
        gettimeofday(&tv, NULL);
        printf("Done.\n\n");

        pub_end = tv.tv_sec*1000+tv.tv_usec/1000;

        sem_post(&sem);
    }
}

BMTP_MSG msg_1 = {
    0, // msg_id;
    1, // stream_id;
    100, // producer_id;
    100, //consumer_id;
    0, // create;
    0, // effect;
    10, // expire;
    2, // type;
    0, // is_compress;
    0, // is_lastwill;
    sizeof("{\"cmd\":\"GET_PHOTO\"}{\"cmd\":\"GET_PHOTO\"}{\"cmd\":\"GET_PHOTO\"}{\"cmd\":\"GET_PHOTO\"}{\"cmd\":\"GET_PHOTO\"}"), //data_len;
    "{\"cmd\":\"GET_PHOTO\"}{\"cmd\":\"GET_PHOTO\"}{\"cmd\":\"GET_PHOTO\"}{\"cmd\":\"GET_PHOTO\"}{\"cmd\":\"GET_PHOTO\"}" // data;
};

int main(int argc, char ** argv) {
    int ch;
    opterr = 0;
    while( ( ch = getopt(argc,argv,"n:c:t:h:p:q:") ) != -1 ) {
        switch(ch) {
            case 'n':
                // 消息数量
                n = atoi(optarg);
                break;
            case 'c':
                // 实例数量
                c = atoi(optarg);
                break;
            case 't':
                // 请求时长限制
                t = atoi(optarg);
                break;
            case 'h':
                // 主机
                host = optarg;
                break;
            case 'p':
                // 端口
                p = atoi(optarg);
                break;
            case 'q':
                // QoS
                q = atoi(optarg);
                if( q != 0 && q != 1 && q != 2 ) {
                    printf("option -q expect [0-2]\n");
                    return 1;
                }
                break;
            case '?':
                printf("invalid option: -%c\n", optopt);
                return 1;
            default:
                printf("invalid option: -%c\n", ch);
                return 1;
        }
    }

    printf("BitMQ Benchmark<V0.1.0> \n\n");

    sem_init(&sem, 0, 0);
    //sem_wait(&sem);

    bmtp_init();

    printf("Connecting %s:%d.", host, p);
    fflush( stdout );

    BMTP **bmtp = calloc(sizeof(BMTP *), c);
    int i = 0;
    while(i < c) {
        bmtp[i] = bmtp_new(host, p, BMTP_SECURE_DISABLE);
        if (!bmtp[i]) {
                printf("\nCan't connect to %s:%d.\n", host, p);
                return 0;
        }

        bmtp_set_on_open( bmtp[i], on_open);
        bmtp_set_on_close(bmtp[i], on_close);
        bmtp_set_on_error(bmtp[i], on_error);
        bmtp_set_on_pub(  bmtp[i], on_pub);
        bmtp_set_on_ack(  bmtp[i], on_ack );

    //bmtp_set_token( bmtp[i], token, sizeof(token)-1);

        bmtp_conn(bmtp[i], NULL, 0);

        i ++;
    }

    sem_wait(&sem);

    struct timeval tv;
    gettimeofday(&tv, NULL);
    pub_start = tv.tv_sec*1000+tv.tv_usec/1000;

    printf("Benchmarking %s:%d (be patient).", host, p);
    fflush( stdout );

    int times = n;
    while(times) {
	if (bmtp_pub(bmtp[times%c], &msg_1) != 0) {
            printf("pub failed\n");
        }
        times--;
    }

    sem_wait(&sem);

    float seconds = ((float)(pub_end - pub_start))/1000;
    int total_transfer = n*(msg_1.data_len+3);
    int msg_transfer = n*(msg_1.data_len+0);

    printf("Concurrency Level:      %d\n", c);
    printf("Time taken for tests:   %.2f seconds\n", seconds);
    printf("Complete publish:       %d\n", n);
    printf("Failed publish:         %d\n", 0);
    printf("Total transferred:      %.2f Mbytes\n", (float)total_transfer/1024/1024);
    printf("Message transferred:    %.2f Mbytes\n", (float)msg_transfer/1024/1024);
    printf("Publish per second:     %.2f [#/sec]\n", (float)n/seconds);
    printf("Time per publish:       %.2f [ms]\n", seconds/n);
    printf("Transfer rate:          %.2f [Mbytes/sec]\n", (float)total_transfer/seconds/1024/1024);

    return 0;
}


